{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import print_function\n",
    "\n",
    "import time\n",
    "import threading\n",
    "import sys\n",
    "if sys.version >= '3':\n",
    "    import queue as Queue\n",
    "else:\n",
    "    import Queue\n",
    "\n",
    "from pyspark import SparkConf, SparkContext"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "def delayed(seconds):\n",
    "    def f(x):\n",
    "        time.sleep(seconds)\n",
    "        return x\n",
    "    return f\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def call_in_background(f, *args):\n",
    "    result = Queue.Queue(1)\n",
    "    t = threading.Thread(target=lambda: result.put(f(*args)))\n",
    "    t.daemon = True\n",
    "    t.start()\n",
    "    return result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def main():\n",
    "    conf = SparkConf().set(\"spark.ui.showConsoleProgress\", \"false\")\n",
    "    sc = SparkContext(appName=\"PythonStatusAPIDemo\", conf=conf)\n",
    "\n",
    "        def run():\n",
    "            rdd = sc.parallelize(range(10), 10).map(delayed(2))\n",
    "            reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)\n",
    "            return reduced.map(delayed(2)).collect()\n",
    "\n",
    "    result = call_in_background(run)\n",
    "    status = sc.statusTracker()\n",
    "        while result.empty():\n",
    "            ids = status.getJobIdsForGroup()\n",
    "            for id in ids:\n",
    "                job = status.getJobInfo(id)\n",
    "                print(\"Job\", id, \"status: \", job.status)\n",
    "                for sid in job.stageIds:\n",
    "                    info = status.getStageInfo(sid)\n",
    "                    if info:\n",
    "                        print(\"Stage %d: %d tasks total (%d active, %d complete)\" %\n",
    "                              (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))\n",
    "            time.sleep(1)\n",
    "\n",
    "        print(\"Job results are:\", result.get())\n",
    "        sc.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 0 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (1 active, 0 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (1 active, 0 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (1 active, 0 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (1 active, 0 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (5 active, 3 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (5 active, 3 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (5 active, 4 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (3 active, 7 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (2 active, 8 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (0 active, 0 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (5 active, 1 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (5 active, 3 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (5 active, 4 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (3 active, 7 complete)\n",
      "Job 0 status:  RUNNING\n",
      "Stage 0: 10 tasks total (0 active, 10 complete)\n",
      "Stage 1: 10 tasks total (2 active, 8 complete)\n",
      "Job results are: [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]\n"
     ]
    }
   ],
   "source": [
    "if __name__ == \"__main__\":\n",
    "    main()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
